/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.message;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;

import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

import static net.sourceforge.argparse4j.impl.Arguments.store;

/**
 * The Kafka message generator.
 */
public final class MessageGenerator {
    static final String JSON_SUFFIX = ".json";

    static final String JSON_GLOB = "*" + JSON_SUFFIX;

    static final String JAVA_SUFFIX = ".java";

    static final String API_MESSAGE_TYPE_JAVA = "ApiMessageType.java";

    static final String API_SCOPE_JAVA = "ApiScope.java";

    static final String COORDINATOR_RECORD_TYPE_JAVA = "CoordinatorRecordType.java";

    static final String COORDINATOR_RECORD_JSON_CONVERTERS_JAVA = "CoordinatorRecordJsonConverters.java";

    static final String METADATA_RECORD_TYPE_JAVA = "MetadataRecordType.java";

    static final String METADATA_JSON_CONVERTERS_JAVA = "MetadataJsonConverters.java";

    static final String API_MESSAGE_CLASS = "org.apache.kafka.common.protocol.ApiMessage";

    static final String MESSAGE_CLASS = "org.apache.kafka.common.protocol.Message";

    static final String MESSAGE_UTIL_CLASS = "org.apache.kafka.common.protocol.MessageUtil";

    static final String READABLE_CLASS = "org.apache.kafka.common.protocol.Readable";

    static final String WRITABLE_CLASS = "org.apache.kafka.common.protocol.Writable";

    static final String ARRAYS_CLASS = "java.util.Arrays";

    static final String OBJECTS_CLASS = "java.util.Objects";

    static final String LIST_CLASS = "java.util.List";

    static final String ARRAYLIST_CLASS = "java.util.ArrayList";

    static final String IMPLICIT_LINKED_HASH_COLLECTION_CLASS =
        "org.apache.kafka.common.utils.ImplicitLinkedHashCollection";

    static final String IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS =
        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection";

    static final String UNSUPPORTED_VERSION_EXCEPTION_CLASS =
        "org.apache.kafka.common.errors.UnsupportedVersionException";

    static final String ITERATOR_CLASS = "java.util.Iterator";

    static final String ENUM_SET_CLASS = "java.util.EnumSet";

    static final String TYPE_CLASS = "org.apache.kafka.common.protocol.types.Type";

    static final String FIELD_CLASS = "org.apache.kafka.common.protocol.types.Field";

    static final String SCHEMA_CLASS = "org.apache.kafka.common.protocol.types.Schema";

    static final String NULLABLE_SCHEMA_CLASS = "org.apache.kafka.common.protocol.types.NullableSchema";

    static final String ARRAYOF_CLASS = "org.apache.kafka.common.protocol.types.ArrayOf";

    static final String COMPACT_ARRAYOF_CLASS = "org.apache.kafka.common.protocol.types.CompactArrayOf";

    static final String BYTES_CLASS = "org.apache.kafka.common.utils.Bytes";

    static final String UUID_CLASS = "org.apache.kafka.common.Uuid";

    static final String BASE_RECORDS_CLASS = "org.apache.kafka.common.record.BaseRecords";

    static final String MEMORY_RECORDS_CLASS = "org.apache.kafka.common.record.MemoryRecords";

    static final String REQUEST_SUFFIX = "Request";

    static final String RESPONSE_SUFFIX = "Response";

    static final String BYTE_UTILS_CLASS = "org.apache.kafka.common.utils.ByteUtils";

    static final String STANDARD_CHARSETS = "java.nio.charset.StandardCharsets";

    static final String TAGGED_FIELDS_SECTION_CLASS = "org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection";

    static final String OBJECT_SERIALIZATION_CACHE_CLASS = "org.apache.kafka.common.protocol.ObjectSerializationCache";

    static final String MESSAGE_SIZE_ACCUMULATOR_CLASS = "org.apache.kafka.common.protocol.MessageSizeAccumulator";

    static final String RAW_TAGGED_FIELD_CLASS = "org.apache.kafka.common.protocol.types.RawTaggedField";

    static final String RAW_TAGGED_FIELD_WRITER_CLASS = "org.apache.kafka.common.protocol.types.RawTaggedFieldWriter";

    static final String TREE_MAP_CLASS = "java.util.TreeMap";

    static final String BYTE_BUFFER_CLASS = "java.nio.ByteBuffer";

    static final String NAVIGABLE_MAP_CLASS = "java.util.NavigableMap";

    static final String MAP_ENTRY_CLASS = "java.util.Map.Entry";

    static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode";

    static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode";

    static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory";

    static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode";

    static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode";

    static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode";

    static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode";

    static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode";

    static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode";

    static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode";

    static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode";

    static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";

    static final long UNSIGNED_INT_MAX = 4294967295L;

    static final int UNSIGNED_SHORT_MAX = 65535;

    /**
     * The Jackson serializer we use for JSON objects.
     */
    public static final ObjectMapper JSON_SERDE;

    static {
        JSON_SERDE = new ObjectMapper();
        JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
        JSON_SERDE.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true);
        JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        JSON_SERDE.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY);
        JSON_SERDE.registerModule(new Jdk8Module());
    }

    private static List<TypeClassGenerator> createTypeClassGenerators(String packageName,
                                                                      List<String> types) {
        if (types == null) return Collections.emptyList();
        List<TypeClassGenerator> generators = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "ApiMessageTypeGenerator":
                    generators.add(new ApiMessageTypeGenerator(packageName));
                    break;
                case "MetadataRecordTypeGenerator":
                    generators.add(new MetadataRecordTypeGenerator(packageName));
                    break;
                case "MetadataJsonConvertersGenerator":
                    generators.add(new MetadataJsonConvertersGenerator(packageName));
                    break;
                case "CoordinatorRecordTypeGenerator":
                    generators.add(new CoordinatorRecordTypeGenerator(packageName));
                    break;
                case "CoordinatorRecordJsonConvertersGenerator":
                    generators.add(new CoordinatorRecordJsonConvertersGenerator(packageName));
                    break;
                default:
                    throw new RuntimeException("Unknown type class generator type '" + type + "'");
            }
        }
        return generators;
    }

    private static List<MessageClassGenerator> createMessageClassGenerators(String packageName,
                                                                            List<String> types) {
        if (types == null) return Collections.emptyList();
        List<MessageClassGenerator> generators = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "MessageDataGenerator":
                    generators.add(new MessageDataGenerator(packageName));
                    break;
                case "JsonConverterGenerator":
                    generators.add(new JsonConverterGenerator(packageName));
                    break;
                default:
                    throw new RuntimeException("Unknown message class generator type '" + type + "'");
            }
        }
        return generators;
    }

    public static void processDirectories(String packageName,
                                          String outputDir,
                                          String inputDir,
                                          List<String> typeClassGeneratorTypes,
                                          List<String> messageClassGeneratorTypes) throws Exception {
        Files.createDirectories(Paths.get(outputDir));
        int numProcessed = 0;

        List<TypeClassGenerator> typeClassGenerators = createTypeClassGenerators(packageName, typeClassGeneratorTypes);
        Set<String> outputFileNames = new HashSet<>();
        try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(inputDir), JSON_GLOB)) {
            for (Path inputPath : directoryStream) {
                try {
                    MessageSpec spec = JSON_SERDE.readValue(inputPath.toFile(), MessageSpec.class);
                    outputFileNames.addAll(
                        generateAndWriteMessageClasses(spec, packageName, outputDir, messageClassGeneratorTypes));
                    numProcessed++;
                    typeClassGenerators.forEach(generator -> generator.registerMessageType(spec));
                } catch (Exception e) {
                    throw new RuntimeException("Exception while processing " + inputPath.toString(), e);
                }
            }
        }
        for (TypeClassGenerator typeClassGenerator : typeClassGenerators) {
            outputFileNames.add(typeClassGenerator.outputName());
            generateAndWriteTypeClasses(outputDir, typeClassGenerator);
        }

        try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(outputDir))) {
            for (Path outputPath : directoryStream) {
                Path fileName = outputPath.getFileName();
                if (fileName != null) {
                    if (!outputFileNames.contains(fileName.toString())) {
                        Files.delete(outputPath);
                    }
                }
            }
        }
        System.out.printf("MessageGenerator: processed %d Kafka message JSON file(s).%n", numProcessed);
    }

    /**
     * Generate and write message classes.
     * @return output file names.
     */
    static Set<String> generateAndWriteMessageClasses(MessageSpec spec,
                                                      String packageName,
                                                      String outputDir,
                                                      List<String> messageClassGeneratorTypes) throws Exception {
        var outputFileNames = new HashSet<String>();
        // Only generate `Data` classes for apis that have valid version(s)
        if (spec.hasValidVersion()) {
            List<MessageClassGenerator> generators = createMessageClassGenerators(packageName, messageClassGeneratorTypes);
            for (MessageClassGenerator generator : generators) {
                String name = generator.outputName(spec) + JAVA_SUFFIX;
                outputFileNames.add(name);
                Path outputPath = Paths.get(outputDir, name);
                try (BufferedWriter writer = Files.newBufferedWriter(outputPath, StandardCharsets.UTF_8)) {
                    generator.generateAndWrite(spec, writer);
                }
            }
        }
        return outputFileNames;
    }

    static void generateAndWriteTypeClasses(String outputDir, TypeClassGenerator typeClassGenerator) throws IOException {
        Path factoryOutputPath = Paths.get(outputDir, typeClassGenerator.outputName());
        try (BufferedWriter writer = Files.newBufferedWriter(factoryOutputPath, StandardCharsets.UTF_8)) {
            typeClassGenerator.generateAndWrite(writer);
        }
    }

    public static String capitalizeFirst(String string) {
        if (string.isEmpty()) {
            return string;
        }
        return string.substring(0, 1).toUpperCase(Locale.ENGLISH) +
            string.substring(1);
    }

    static String lowerCaseFirst(String string) {
        if (string.isEmpty()) {
            return string;
        }
        return string.substring(0, 1).toLowerCase(Locale.ENGLISH) +
            string.substring(1);
    }

    static boolean firstIsCapitalized(String string) {
        if (string.isEmpty()) {
            return false;
        }
        return Character.isUpperCase(string.charAt(0));
    }

    static String toSnakeCase(String string) {
        StringBuilder bld = new StringBuilder();
        boolean prevWasCapitalized = true;
        for (int i = 0; i < string.length(); i++) {
            char c = string.charAt(i);
            if (Character.isUpperCase(c)) {
                if (!prevWasCapitalized) {
                    bld.append('_');
                }
                bld.append(Character.toLowerCase(c));
                prevWasCapitalized = true;
            } else {
                bld.append(c);
                prevWasCapitalized = false;
            }
        }
        return bld.toString();
    }

    static String stripSuffix(String str, String suffix) {
        if (str.endsWith(suffix)) {
            return str.substring(0, str.length() - suffix.length());
        } else {
            throw new RuntimeException("String " + str + " does not end with the " +
                "expected suffix " + suffix);
        }
    }

    /**
     * Return the number of bytes needed to encode an integer in unsigned variable-length format.
     */
    static int sizeOfUnsignedVarint(int value) {
        int bytes = 1;
        while ((value & 0xffffff80) != 0L) {
            bytes += 1;
            value >>>= 7;
        }
        return bytes;
    }

    public static void main(String[] args) throws Exception {
        ArgumentParser parser = ArgumentParsers
            .newArgumentParser("message-generator")
            .defaultHelp(true)
            .description("The Kafka message generator");
        parser.addArgument("--package", "-p")
            .action(store())
            .required(true)
            .metavar("PACKAGE")
            .help("The java package to use in generated files.");
        parser.addArgument("--output", "-o")
            .action(store())
            .required(true)
            .metavar("OUTPUT")
            .help("The output directory to create.");
        parser.addArgument("--input", "-i")
            .action(store())
            .required(true)
            .metavar("INPUT")
            .help("The input directory to use.");
        parser.addArgument("--typeclass-generators", "-t")
            .nargs("+")
            .action(store())
            .metavar("TYPECLASS_GENERATORS")
            .help("The type class generators to use, if any.");
        parser.addArgument("--message-class-generators", "-m")
            .nargs("+")
            .action(store())
            .metavar("MESSAGE_CLASS_GENERATORS")
            .help("The message class generators to use.");
        Namespace res = parser.parseArgsOrFail(args);
        processDirectories(res.getString("package"), res.getString("output"),
            res.getString("input"), res.getList("typeclass_generators"),
            res.getList("message_class_generators"));
    }
}
